Flink + Kafka, java.lang.OutOfMemoryError, когда параллелизм> 1 - PullRequest
0 голосов
/ 07 марта 2019

У меня есть игрушка Flink, которая читает 3 темы кафки, а затем объединяет все эти 3 потока.Вот и все, никакой дополнительной работы.

Если использовать параллелизм 1 для моей работы Flink, все выглядит нормально, так как при изменении параллелизма> 1 происходит сбой:

java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:693)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)

Как получилосьон работает с параллелизмом 1, но не с параллелизмом> 1?

Это связано с настройкой стороны сервера kafka?Или это связано с настройкой потребителя в моем коде Java (в моем коде пока нет специальных настроек)?

Я знаю, что предоставленной здесь информации может быть недостаточно, но я не могу прикоснуться к кластеру кафки.Я просто надеюсь, что некоторые гуру могут случайно столкнуться с той же ошибкой и могут поделиться со мной некоторыми советами.

Я использую kafka 0.10, flink 1.5.

Большое спасибо.

1 Ответ

1 голос
/ 07 марта 2019

Как вы можете видеть в журналах ошибок, эта ошибка из вашего кластера Kafka.Эта проблема возникает, когда память прямого буфера брокера Kafka превышает размер кучи , назначенный JVM.Прямая буферная память выделяется из кучи JVM, как того требует приложение.Когда вы используете параллелизм> 1, несколько задач Flink, min (Количество слотов Flink, Количество разделов Kafka) будут одновременно использовать данные из Kafka, что приведет к большему использованию размера кучи брокеров Kafka по сравнениюкогда параллелизм равен единице и произойдет так называемая ошибка.Стандартным решением является увеличение размера кучи, доступного для Kafka Brokers, путем добавления переменной KAFKA_HEAP_OPTS в файл env Kafka или в качестве переменной среды ОС .Например, добавьте следующую строку, чтобы установить размер кучи равным 2 ГБ:

export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"

Но в вашем случае, когда нет доступа к брокеру Kafka (в зависимости от вашего вопроса), вы можете уменьшить количествозапись возвращается в одном вызове poll (), поэтому потребность в памяти кучи у брокеров будет уменьшена.(Это не стандартное решение, я рекомендую просто исчезнуть ошибка).

Из этого ответа :

Потребители Kafka обрабатывают данные о невыполненных работах по следующим двум параметрам:

max.poll.interval.ms
Максимальная задержка между вызовами опроса () при использовании управления группами потребителей.Это накладывает верхнюю границу на количество времени, в течение которого потребитель может бездействовать до получения большего количества записей.Если poll () не вызывается до истечения этого тайм-аута, то считается, что потребитель потерпел неудачу, и группа будет перебалансирована, чтобы переназначить разделы другому участнику.Значение по умолчанию - 300000.

max.poll.records
Максимальное количество записей, возвращаемых за один вызов poll ().Значение по умолчанию - 500.

Игнорирование установки вышеупомянутых двух параметров в соответствии с требованием может привести к опросу максимальных данных, которые потребитель не сможет обработать с помощью доступных ресурсов, что приведет к OutOfMemory или к отказусовершать смещение потребителя в разы.Следовательно, всегда желательно использовать параметры max.poll.records и max.poll.interval.ms.

Поэтому для теста уменьшите значение max.poll.записывает , например, 250 и проверяет, не произойдет ли пока ошибка.

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");

FlinkKafkaConsumer08<String> myConsumer =
    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
...